package com.xiang.ad.mysql;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.xiang.ad.mysql.listener.AggregationListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * Created by xiang.
 * 实现监听的启动 与 断开
 */
@Slf4j
@Component
public class BinlogClient {

    private BinaryLogClient client;

    //注入配置信息 以及 监听器
    private final BinlogConfig config;
    private final AggregationListener listener;

    @Autowired
    public BinlogClient(BinlogConfig config, AggregationListener listener) {
        this.config = config;
        this.listener = listener;
    }

    // 实现监听的启动
    public void connect() {

        //新起一个线程，因为之前的测试用例最下面，那个会阻塞在那不动
        new Thread(() -> {
            client = new BinaryLogClient(
                    config.getHost(),
                    config.getPort(),
                    config.getUsername(),
                    config.getPassword()
            );

            //
            if (!StringUtils.isEmpty(config.getBinlogName()) &&
                    !config.getPosition().equals(-1L)) {
                //从当前位置开始监听
                client.setBinlogFilename(config.getBinlogName());
                client.setBinlogPosition(config.getPosition());
            }
            //将listener 注入，给client注册监听器
            client.registerEventListener(listener);

            try {
                log.info("connecting to mysql start");
                client.connect();//启动
                log.info("connecting to mysql done");
            } catch (IOException ex) {
                ex.printStackTrace();
            }

        }).start();
    }

    // 断开mysql的监听
    public void close() {
        try {
            client.disconnect();//断开
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}
